package xdb;

import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Lock;

/**
 * <pre>
 * XDB 并发分析和优化
 *
 * 事务的开始
 *    略。
 *    总结： 事务是什么。
 *
 * 事务的实现
 *    Table 需要实现三个基本操作，add, remove, get, 数据层是两个Map。一个是 MapCache，处于XDB的控制下；
 *    另一个是 MapData，就是 db.h 的表，是 xDB 不能控制的。先不考虑把修改保存到 MspData 中时，为了能让
 *    事务工作，最终的结果是得到TRecord的5个状态。
 *    此时，所有的事务都能工作了，只是所有的修改都不会被存下来。整个数据层次如下：
 *    总结： TTable(add, remove, get) on TTableCache + TStorage
 *
 * 保存和事务并发
 *    所有的事务访问都在 Tables.flushReadLock 内执行。保存在 Tables.flushWriteLock 内执行，
 *    保存过程包括，系列化(marshal)，修改记录状态和Cache(snapshot)，写入(flush)。
 *    通过 flushLock 的保护，当保存开始时，实际上停止了所有的事务，防止把正在执行中的事务的脏数据保存下来。
 *    当保存完成以后，释放flushWriteLock，让事务能开始执行。
 *    这就是未优化版本的XDB的并发设计。
 *    总价： Tables.flushWriteLock { marshal + snapshot + flush }
 *
 * 性能问题
 *    由于保存需要停止所有事务，会造成并发上的瓶颈。优化的目的就是让保存时，停止事务的时间尽量短。
 *
 * 优化思路
 *    把 marshal 和 flush 操作放到 flushLock 之外执行。
 *    其中 marshal 采用记录锁和事务并发执行；
 *    snapshot 仍然在 flushLock 之内执行，由于剩下的工作很少，执行的很快；
 *    flush 就把 snapshot 的快照数据写入，几乎是不需要锁的。由于 snapshot 执行后，事务层就
 *    认为所有的数据已经保存好了，而这时记录实际上还没有刷新到Storage中，此时需要修改原来直接
 *    查询底层数据的接口，让它们先从正在保存快照中查询。这样snapshot就会被事务并发访问，需要保护。
 *
 * 总结： 整个实现从 Checkpoint.checkpoint(XdbConf ) 开始看.
 * </pre>
 */
final class Checkpoint extends ThreadHelper implements CheckpointMBean {
    private final xdb.util.Elapse elapse = new xdb.util.Elapse();
    private final Tables tables;
    // 只有 Checkpoint 线程会修改这些值。 JMX需要读取，用个 volatile 表示一下。
    private volatile long marshalNCount = 0;
    private volatile long marshal0Count = 0;
    private volatile long snapshotCount = 0;
    private volatile long flushCount = 0;
    private volatile long marshalNTotalTime = 0;
    private volatile long snapshotTotalTime = 0; // 包含 marshal0 的时间
    private volatile long flushTotalTime = 0;
    private volatile long lastFlushTime = 0; //时间标记，解决往前改时间导致MySQL缓存丢失数据的问题
    private volatile long nextMarshalTime;
    private volatile long nextCheckpointTime;
    private boolean checkpointNow = false; // 马上执行一次checkpoint
    private final Object checkpointWaitQueue = new Object(); // checkpoint执行等待队列。

    Checkpoint(Tables tables) {
        super("xdb.Checkpoint");

        this.tables = tables;

        long now = System.currentTimeMillis();
        XdbConf conf = Xdb.getInstance().getConf();

        //存盘起始时间做一个随机，避免所有服务器同时启动，存盘时间在同一个时间点，导致数据库IO有峰值
        nextMarshalTime = now + conf.getMarshalPeriod();
        nextCheckpointTime = now + conf.getCheckpointPeriod() + new Random(System.currentTimeMillis()).nextInt(conf.getCheckpointPeriod());

        Xdb.mbeans().register(this, "xdb:type=Xdb,name=Checkpoint");
    }

    @Override
    public void checkpoint(long waitTimeout) throws InterruptedException {
        synchronized (this.checkpointWaitQueue) {
            this.checkpointNow = true;
            this.wakeup();
            if (waitTimeout >= 0) {
                this.checkpointWaitQueue.wait(waitTimeout);
            }
        }
    }

    private boolean checkpointNow() {
        synchronized (this.checkpointWaitQueue) {
            return this.checkpointNow;
        }
    }

    private void checkpointDone() {
        synchronized (this.checkpointWaitQueue) {
            this.checkpointNow = false;
            this.checkpointWaitQueue.notifyAll();
        }
    }

    private void checkpoint(final long now, XdbConf conf) {
        try {
            if (conf.getMarshalPeriod() >= 0 && nextMarshalTime <= now && conf.isAutoMarshal()) {
                nextMarshalTime = now + conf.getMarshalPeriod();
                long start = System.nanoTime();
                int countMarshalN = 0;
                for (Storage storage : tables.getStorages()) {
                    countMarshalN += storage.marshalN();
                }
                this.marshalNCount += countMarshalN;
                this.marshalNTotalTime += System.nanoTime() - start;
                Trace.info("marshalN=*/" + countMarshalN);
            }

            // 如果备份正在进行，即使设置了checkpointNow，也不会马上执行，而是等待下一个循环再次尝试。
            final int checkpointPeriod = conf.getCheckpointPeriod();
            if (checkpointPeriod >= 0 && (this.checkpointNow() || nextCheckpointTime <= now)) {
                nextCheckpointTime = now + checkpointPeriod;
                checkpoint(conf);
            }

        } catch (Throwable e) {
            Trace.fatal("halt program", e);
            Runtime.getRuntime().halt(54321);
        }
    }

    private void checkpoint(XdbConf conf) {
        Trace.info("---------------- begin ----------------");
        final List<Storage> storages = this.tables.getStorages();

        //////////////////////////////////////////
        // marshalN
        if (conf.getMarshalN() < 1) {
            Trace.warn("marshalN disabled");
        }

        elapse.reset();
        for (int i = 1; i <= conf.getMarshalN(); ++i) {
            int countMarshalN = 0;
            for (Storage storage : storages) {
                countMarshalN += storage.marshalN();
            }
            this.marshalNCount += countMarshalN;
            Trace.info("marshalN=" + i + "/" + countMarshalN);
        }
        this.marshalNTotalTime += elapse.elapsed();

        //////////////////////////////////////////
        // snapshot
        {
            int countSnapshot = 0;
            int countMarshal0 = 0;
            Lock lock = tables.flushWriteLock();
            lock.lock();
            elapse.reset();
            try {
                for (Storage storage : storages) {
                    countMarshal0 += storage.marshal0();
                }
                for (Storage storage : storages) {
                    countSnapshot += storage.snapshot();
                }
            } finally {
                lock.unlock();
            }

            final long snapshotTime = elapse.elapsedAndReset();
            if (snapshotTime / 100000 > conf.getSnapshotFatalTime()) {
                Trace.fatal("snapshot time=" + snapshotTime + " snapshot=" + countSnapshot + " marshal0=" + countMarshal0);
            }

            this.marshal0Count += countMarshal0;
            this.snapshotTotalTime += snapshotTime;
            this.snapshotCount += countSnapshot;
            Trace.info("snapshot=" + countSnapshot + " marshal0=" + countMarshal0);
        }

        //////////////////////////////////////////
        // flush
        int countFlush = 0;
        for (Storage storage : storages) {
            lastFlushTime = Math.max(lastFlushTime + 1, System.currentTimeMillis());
            countFlush += storage.flush(lastFlushTime);
        }
        this.flushCount += countFlush;
        this.flushTotalTime += elapse.elapsedAndReset();
        Trace.info("flush=" + countFlush);

        this.checkpointDone();
        Trace.info("----------------- end -----------------");
    }

    @Override
    public void run() {
        XdbConf conf = Xdb.getInstance().getConf();
        while (super.isRunning()) {
            long now = System.currentTimeMillis();
            this.checkpoint(now, conf);
            super.sleepIdle(100); // 检测精度，相当于最小间隔。
        }

        // final checkpoint
        Trace.warn("final checkpoint begin");
        checkpoint(conf);
        Trace.warn("final checkpoint end");
    }

    // 监控调试

    @Override
    public long getCountMarshalN() {
        return marshalNCount;
    }

    @Override
    public long getCountMarshal0() {
        return marshal0Count;
    }

    @Override
    public long getCountFlush() {
        return this.flushCount;
    }

    @Override
    public long getTotalTimeFlush() {
        return this.flushTotalTime;
    }

    @Override
    public long getTotalTimeSnapshot() {
        return snapshotTotalTime;
    }

    @Override
    public long getTotalTimeMarshalN() {
        return marshalNTotalTime;
    }

    public long getNextFlushTime() {
        return nextMarshalTime;
    }

    public long getNextCheckpointTime() {
        return nextCheckpointTime;
    }

    @Override
    public String getTimeOfNextCheckpoint() {
        return Trace.dateFormat.format(this.getNextCheckpointTime());
    }

    @Override
    public String getTimeOfNextFlush() {
        return Trace.dateFormat.format(this.getNextFlushTime());
    }

    @Override
    public int getPeriodCheckpoint() {
        return xdb.Xdb.getInstance().getConf().getCheckpointPeriod();
    }

    @Override
    public void setPeriodCheckpoint(int period) {
        xdb.Xdb.getInstance().getConf().setCheckpointPeriod(period);
    }

    @Override
    public long getCountSnapshot() {
        return this.snapshotCount;
    }
}
